/*package com.easy.mq.listener.adapter;

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.easy.mq.config.consumer.ManagerConsumerConfig;
import com.easy.mq.config.consumer.RabbitConsumerConfig;
import com.easy.mq.enums.ConsumeResultStatus;
import com.easy.mq.enums.XT;
import com.easy.mq.event.MQEvent;
import com.easy.mq.listener.AbstractMessageListenerContainer;
import com.easy.mq.listener.ManagerListener;
import com.easy.mq.serialization.SerializationFactory;
import com.easy.mq.utils.StringUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class RabbitLoopListenerAdapter<T> extends AbstractMessageListenerContainer {

	private static final Logger logger = LoggerFactory.getLogger(RabbitLoopListenerAdapter.class);

	private ManagerListener<T> managerListener;

	private ManagerConsumerConfig<T> managerConsumerConfig;

	public RabbitLoopListenerAdapter(ManagerListener<T> managerConfig, ManagerConsumerConfig<T> managerConsumerConfig) {
		this.managerListener = managerConfig;
		this.managerConsumerConfig = managerConsumerConfig;
	}

	@Override
	public void registerMessageListener() {
		ListenerConsumer consumer = new ListenerConsumer();
		Thread thread = new Thread(consumer);
		thread.setName("listenerConsumer.");
		thread.setDaemon(true);
		thread.start();
	}

	class ListenerConsumer implements Runnable {

		private Timer timer = new Timer("ListenerConsumer-timer", true);

		private volatile ListenerInvoker invoker;

		private volatile Future<?> listenerInvokerFuture;

		@Override
		public void run() {
			startInvoker();
			timer.schedule(new DaemonTask(this), 10 * 1000, 10 * 1000);
		}

		private void startInvoker() {
			setInvoker(new ListenerInvoker());
			ExecutorService fixedThreadPool = Executors.newSingleThreadExecutor();
			setListenerInvokerFuture(fixedThreadPool.submit(ListenerConsumer.this.invoker));
		}

		public ListenerInvoker getInvoker() {
			return invoker;
		}

		public void setInvoker(ListenerInvoker invoker) {
			this.invoker = invoker;
		}

		public Future<?> getListenerInvokerFuture() {
			return listenerInvokerFuture;
		}

		public void setListenerInvokerFuture(Future<?> listenerInvokerFuture) {
			this.listenerInvokerFuture = listenerInvokerFuture;
		}

	}

	class DaemonTask extends TimerTask {

		ListenerConsumer consumer;

		DaemonTask(ListenerConsumer consumer) {
			this.consumer = consumer;
		}

		@Override
		public void run() {
			if (!consumer.invoker.active) {
				consumer.invoker.active = true;
			}
		}

	}
	
	*//**
	 * //消息的标识，false只确认当前一个消息收到，true确认所有consumer获得的消息
		channel.basicAck(getDeliveryTag(), false);
		//ack返回false，并重新回到队列，api里面解释得很清楚
		channel.basicNack(getDeliveryTag(), false, true);
		//拒绝消息
		channel.basicReject(getDeliveryTag(), true);
	 *
	 *//*

	class ListenerInvoker implements Callable<ListenerInvoker> {

		private final CountDownLatch exitLatch = new CountDownLatch(1);

		private volatile boolean active = true;

		ListenerInvoker() {
		}

		@Override
		public ListenerInvoker call() {
			try {
				RabbitConsumerConfig rabbitConsumerConfig = ((RabbitConsumerConfig) managerConsumerConfig
						.getConsumer());
				ConnectionFactory factory = new ConnectionFactory();
				factory.setHost(getHost(rabbitConsumerConfig));
				if (rabbitConsumerConfig.getPort() != null && rabbitConsumerConfig.getPort() > 0) {
					factory.setPort(rabbitConsumerConfig.getPort());
				}
				if (StringUtil.isNotEmpty(rabbitConsumerConfig.getUsername())) {
					factory.setUsername(rabbitConsumerConfig.getUsername());
				}

				if (StringUtil.isNotEmpty(rabbitConsumerConfig.getPassword())) {
					factory.setPassword(rabbitConsumerConfig.getPassword());
				}

				Connection connection = factory.newConnection();
				Channel channel = connection.createChannel();

				String queueName = rabbitConsumerConfig.getQueue();

				switch (XT.valueOf(rabbitConsumerConfig.getType())) {
				case DEFAULT:
					// 队列的相关参数需要与第一次定义该队列时相同，否则会出错，使用channel.queueDeclarePassive()可只被动绑定已有队列，而不创建
					channel.queueDeclare(queueName, rabbitConsumerConfig.isDurable(),
							rabbitConsumerConfig.isExclusive(), rabbitConsumerConfig.isAutoDelete(), null);
					break;
				case FANOUT:
					// 接收端也声明一个fanout交换机
					channel.exchangeDeclare(rabbitConsumerConfig.getExchange(), "fanout",
							rabbitConsumerConfig.isDurable(), rabbitConsumerConfig.isAutoDelete(), null);
					// channel.exchangeDeclarePassive() 可以使用该函数使用一个已经建立的exchange
					// 声明一个临时队列，该队列会在使用完比后自动销毁
					queueName = channel.queueDeclare().getQueue();
					// 将队列绑定到交换机,参数3无意义此时
					channel.queueBind(queueName, rabbitConsumerConfig.getExchange(), "");
					break;
				case DIRECT:
					channel.exchangeDeclare(rabbitConsumerConfig.getExchange(), "direct", rabbitConsumerConfig.isDurable(),
							rabbitConsumerConfig.isAutoDelete(), null);
					queueName = channel.queueDeclare().getQueue();
					for(String value : rabbitConsumerConfig.getQueueBindList()) {
						channel.queueBind(queueName, rabbitConsumerConfig.getExchange(), value); // 绑定一个routing key，可以绑定多个
					}
					break;
				case TOPIC:
					channel.exchangeDeclare(rabbitConsumerConfig.getExchange(), "topic",
							rabbitConsumerConfig.isDurable(), rabbitConsumerConfig.isAutoDelete(), null);
					queueName = channel.queueDeclare().getQueue();
					for(String value : rabbitConsumerConfig.getQueueBindList()) {
						channel.queueBind(queueName, rabbitConsumerConfig.getExchange(), value); // 监听两种模式 #匹配一个或多个单词 *匹配一个单词
					}
					break;
				case HEADERS:
					channel.exchangeDeclare(rabbitConsumerConfig.getExchange(), "headers",
							rabbitConsumerConfig.isDurable(), rabbitConsumerConfig.isAutoDelete(), null);
					queueName = channel.queueDeclare().getQueue();

					channel.queueBind(queueName, rabbitConsumerConfig.getExchange(), "",
							rabbitConsumerConfig.getHeaderMap());
					break;
				}

				// 在同一时间不要给一个worker一个以上的消息。
				// 不要将一个新的消息分发给worker知道它处理完了并且返回了前一个消息的通知标志（acknowledged）
				// 替代的，消息将会分发给下一个不忙的worker。
				channel.basicQos(rabbitConsumerConfig.getPrefetchCount()); 

				// 用来缓存服务器推送过来的消息
				QueueingConsumer consumer = new QueueingConsumer(channel);

				channel.basicConsume(queueName, rabbitConsumerConfig.isAutoAck(), consumer);
				// 使用该函数主动去服务器检索是否有新消息，而不是等待服务器推送
				// channel.basicGet()

				while (this.active) {
					QueueingConsumer.Delivery delivery = consumer.nextDelivery();
					try {
						if (!this.active) {
							if (logger.isTraceEnabled()) {
								logger.trace("No records to process");
							}
							continue;
						}
						if (delivery == null || delivery.getBody() == null || delivery.getBody().length <= 0) {
							continue;
						}
						MQEvent<Object> message = new MQEvent<Object>();
						if (StringUtil.isNotEmpty(managerConsumerConfig.getParamType())) {
							try {
								if (!"java.lang.String".equals(managerConsumerConfig.getParamType())) {
									Class<?> cl = Class.forName(managerConsumerConfig.getParamType());
									message.setContent(
											SerializationFactory.factory(managerConsumerConfig.getTransfer())
													.deserializer(delivery.getBody(), cl));
								} else {
									message.setContent(new String(delivery.getBody(),"UTF-8"));
								}

							} catch (Exception e) {
								logger.error(e.getMessage() + new String(delivery.getBody(),"UTF-8"), e);
							}
						} else {
							message.setContent(new String(delivery.getBody(),"UTF-8"));
						}
						ConsumeResultStatus status = managerListener.notifyListener(managerConsumerConfig, message);
						if (ConsumeResultStatus.SUCCESS == status) {
							channel.basicAck(delivery.getEnvelope().getDeliveryTag(), rabbitConsumerConfig.isAutoAck());
						}else {
							channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
						}
					} catch (Exception e) {
						if (!this.active) {
							Thread.currentThread().interrupt();
						} else {
							if (logger.isDebugEnabled()) {
								logger.debug("Interrupt ignored");
							}
						}
					}
				}
			} catch (Exception e) {
				logger.error(e.getMessage());
			} finally {
				this.active = false;
				this.exitLatch.countDown();
			}
			return this;
		}

	}

	private String getHost(RabbitConsumerConfig rabbitConsumerConfig) {
		final String address = rabbitConsumerConfig.getAddress();
		if (StringUtil.isNotEmpty(address) && address.indexOf(":") <= -1) {
			return address;
		}
		return address.substring(0, address.indexOf(":"));
	}

}
*/